'''
Author: ica caijianling@outlook.com
Date: 2024-05-08 08:28:32
LastEditors: ica caijianling@outlook.com
LastEditTime: 2024-05-08 10:54:44
FilePath: \dy-llm-voice\main.py
Description: 这是默认设置,请设置`customMade`, 打开koroFileHeader查看配置 进行设置: https://github.com/OBKoro1/koro1FileHeader/wiki/%E9%85%8D%E7%BD%AE
'''
import json
import asyncio
import websockets
import aiofiles
import os
import tempfile
import threading
import aiohttp
import queue
import time
# 这里需要使用一个音频播放库，例如pydub
from pydub import AudioSegment
from pydub.playback import play

# 配置文件路径
config_file_path = 'config.json'


# 读取配置文件
with open(config_file_path, 'r') as f:
    config = json.load(f)

# 获取配置值
# WebSocket服务器地址
ws_url = config['ws_url']
# 大语言模型API地址
model_api_url = config['model_api_url']
# 语音合成服务地址
tts_api_url = config['tts_api_url']
# 模型Key
model_key = config['model_key']
# 模型代码
model_code = config['model_code']
# 语音的语言
tts_language = config['tts_language']

# 大语言模型API的请求头
model_api_headers = {
    'Authorization': f"Bearer {model_key}",
    'Content-Type': 'application/json'
}

# 大语言模型API的请求体模板
model_api_payload = {
    "model": model_code,
    "messages": [
        {
            "content": "",
            "role": "user"
        }
    ]
}

# 错误处理函数
def handle_error(error):
    print(f"An error occurred: {error}")

# 异步大语言模型请求函数
async def ai_fetch_async(session, url, payload):
    try:
        response = await session.post(url, headers=model_api_headers, json=payload)
        return await response.json()
    except Exception as e:
        handle_error(e)

# 异步请求TTS语音函数
async def tts_fetch_async(session, url, text):
    try:
        payload = {
            'text': text,
            'language_code': tts_language
        }
        response = await session.post(url, data=payload)
        return await response.json()
    except Exception as e:
        handle_error(e)

# 音频播放队列
audio_queue = queue.Queue()
playing = False

# 异步播放音频函数
async def play_audio_async(session, audio_url):
    try:
        # 创建临时文件
        tmp_folder = 'tmp'  # 临时文件夹名称
        if not os.path.exists(tmp_folder):
            os.makedirs(tmp_folder)  # 如果文件夹不存在，创建它
        temp_file_path = os.path.join(tmp_folder, next(tempfile._get_candidate_names()) + '.mp3')

        # 下载音频
        response = await session.get(audio_url)
        audio_data = await response.read()
        async with aiofiles.open(temp_file_path, 'wb') as f:
            await f.write(audio_data)

        print(f'已创建并下载{temp_file_path}')

        # 将临时文件路径加入队列
        audio_queue.put(temp_file_path)

        # 如果当前没有音频在播放，则播放队列中的音频
        # if not playing:
        #     await play_next_audio()

    except Exception as e:
        handle_error(e)

# 音频播放线程
def audio_player_thread():
    while True:
        if not audio_queue.empty():

            # 获取队列中的临时文件路径
            temp_file_path = audio_queue.get()

            # 播放音频
            audio = AudioSegment.from_mp3(temp_file_path)
            play(audio)

            # 删除临时文件
            print(f'开始删除{temp_file_path}')
            os.remove(temp_file_path)
            print(f'已删除{temp_file_path}')

            # 等待一秒
            time.sleep(1)

# async def play_next_audio():
#     global playing
#     if not audio_queue.empty():

#         # 获取队列中的临时文件路径
#         temp_file_path = audio_queue.get()

#         # 创建一个Event，用于等待音频播放完毕
#         audio_played_event = asyncio.Event()

#         # 在线程中播放音频
#         def play_in_thread(temp_file_path, event):
#             audio = AudioSegment.from_mp3(temp_file_path)
#             play(audio)
#             # 删除临时文件
#             print(f'开始删除{temp_file_path}')
#             os.remove(temp_file_path)
#             print(f'已删除{temp_file_path}')
#             event.set()  # 音频播放完毕后设置事件
        
#         # 启动线程播放音频
#         threading.Thread(target=play_in_thread, args=(temp_file_path, audio_played_event)).start()
        
#         # 等待音频播放完毕
#         await audio_played_event.wait()
        
#         # 播放完一个音频后，继续播放下一个
#         await asyncio.sleep(1)  # 等待1秒
#         await play_next_audio()

# 异步处理WebSocket消息
async def handle_ws_messages(ws):
    async for message in ws:
        data = json.loads(message)
        if data['Type'] == 1:  # 消息类型为弹幕消息
            # 创建线程并传递异步任务作为回调函数
            thread = threading.Thread(target=asyncio.run, args=(process_message_async(data), ))
            thread.start()

# 异步处理弹幕消息
async def process_message_async(data):
    # 将Data字段中的转义双引号替换为未转义的双引号
    data_content = data['Data'].replace('\\"', '"')
    # 解析处理后的JSON
    data_json = json.loads(data_content)
    nickname = data_json['User']['Nickname']
    content = data_json['Content']
    # 将信息填入content
    model_api_payload['messages'][0]['content'] = f"{nickname}: {content}"
    print(model_api_payload)

    # 创建aiohttp.ClientSession对象
    async with aiohttp.ClientSession() as session:
        response = await ai_fetch_async(session, model_api_url, model_api_payload)
        print(response['choices'][0]['message']['content'])
        audio_url = await tts_fetch_async(session, tts_api_url, response['choices'][0]['message']['content'])
        print(audio_url['result_audio_url'])
        await play_audio_async(session, audio_url['result_audio_url'])

# 启动WebSocket客户端
async def main():
    # 创建并启动音频播放线程
    player_thread = threading.Thread(target=audio_player_thread)
    player_thread.daemon = True  # 设置为守护线程
    player_thread.start()
    while True:
        try:
            async with websockets.connect(ws_url) as ws:
                print('ws链接已开启.')
                await handle_ws_messages(ws)
        except websockets.exceptions.ConnectionClosedError as e:
            print(f"ws链接被关闭: {e}. 即将重试...")
            # await asyncio.sleep(1)
        except Exception as e:
            print(f"发生意外错误: {e}")
            break
    # 等待队列中的所有任务完成
    audio_queue.join()

# 运行主函数
asyncio.run(main())


